package client

import (
	"context"
	"fmt"
	"sync"
	"time"

	"github.com/OffchainLabs/prysm/v7/api/client"
	fieldparams "github.com/OffchainLabs/prysm/v7/config/fieldparams"
	"github.com/OffchainLabs/prysm/v7/config/params"
	"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
	"github.com/OffchainLabs/prysm/v7/encoding/bytesutil"
	prysmTrace "github.com/OffchainLabs/prysm/v7/monitoring/tracing/trace"
	"github.com/OffchainLabs/prysm/v7/time/slots"
	"github.com/OffchainLabs/prysm/v7/validator/client/iface"
	"github.com/pkg/errors"
	"go.opentelemetry.io/otel/trace"
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/status"
)

// Time to wait before trying to reconnect with beacon node.
var backOffPeriod = 10 * time.Second

// runner encapsulates the main validator routine.
type runner struct {
	validator     iface.Validator
	healthMonitor *healthMonitor
}

// newRunner creates a new runner instance and performs all necessary initialization.
// This function can return an error if initialization fails.
//
// Order of operations:
// 1 - Initialize validator data
// 2 - Wait for validator activation
func newRunner(ctx context.Context, v iface.Validator, monitor *healthMonitor) (*runner, error) {
	// Initialize validator and get head slot
	err := initialize(ctx, v)
	if err != nil {
		v.Done()
		return nil, err
	}
	currentSlot := slots.CurrentSlot(v.GenesisTime()) // set in v.WaitForChainStart
	// Prepare initial duties update
	ss, err := slots.EpochStart(slots.ToEpoch(currentSlot + 1))
	if err != nil {
		log.WithError(err).Error("Failed to get epoch start")
		ss = currentSlot
	}
	startDeadline := v.SlotDeadline(ss + params.BeaconConfig().SlotsPerEpoch - 1)
	startCtx, startCancel := context.WithDeadline(ctx, startDeadline)
	if err := v.UpdateDuties(startCtx); err != nil {
		// Don't return error here, just log it
		handleAssignmentError(err, currentSlot)
	}
	startCancel()

	// check if proposer settings is still nil
	// Set properties on the beacon node like the fee recipient for validators that are being used & active.
	if v.ProposerSettings() == nil {
		log.Warn("Validator client started without proposer settings such as fee recipient" +
			" and will continue to use settings provided in the beacon node.")
	}
	if err := v.PushProposerSettings(ctx, currentSlot, true); err != nil {
		v.Done()
		return nil, errors.Wrap(err, "failed to update proposer settings")
	}
	return &runner{
		validator:     v,
		healthMonitor: monitor,
	}, nil
}

// run executes the main validator routine. This routine exits if the context is
// canceled. It returns a channel that will be closed when the routine exits.
//
// Order of operations:
// 1 - Wait for the next slot start
// 2 - Update assignments if needed
// 3 - Determine role at current slot
// 4 - Perform assigned role, if any
func (r *runner) run(ctx context.Context) {
	v := r.validator
	cleanup := v.Done
	defer cleanup()
	v.SetTicker()
	for {
		select {
		case <-ctx.Done():
			log.Info("Context canceled, stopping validator")
			//nolint:govet
			return // Exit if context is canceled.
		case slot := <-v.NextSlot():
			if !r.healthMonitor.IsHealthy() {
				log.Warn("Beacon node unhealthy, stopping runner")
				return
			}

			deadline := v.SlotDeadline(slot)
			slotCtx, cancel := context.WithDeadline(ctx, deadline) //nolint:govet

			var span trace.Span
			slotCtx, span = prysmTrace.StartSpan(slotCtx, "validator.processSlot")
			span.SetAttributes(prysmTrace.Int64Attribute("slot", int64(slot))) // lint:ignore uintcast -- This conversion is OK for tracing.

			log := log.WithField("slot", slot)
			log.WithField("deadline", deadline).Debug("Set deadline for proposals and attestations")

			// Keep trying to update assignments if they are nil or if we are past an
			// epoch transition in the beacon node's state.
			if slots.IsEpochStart(slot) {
				deadline = v.SlotDeadline(slot + params.BeaconConfig().SlotsPerEpoch - 1)
				dutiesCtx, dutiesCancel := context.WithDeadline(ctx, deadline)
				if err := v.UpdateDuties(dutiesCtx); err != nil {
					handleAssignmentError(err, slot)
					dutiesCancel()
					span.End()
					cancel()
					continue
				}
				dutiesCancel()
			}

			// call push proposer settings often to account for the following edge cases:
			// proposer is activated at the start of epoch and tries to propose immediately
			// account has changed in the middle of an epoch
			if err := v.PushProposerSettings(slotCtx, slot, false); err != nil {
				log.WithError(err).Warn("Failed to update proposer settings")
			}

			// Start fetching domain data for the next epoch.
			if slots.IsEpochEnd(slot) {
				domainCtx, _ := context.WithDeadline(ctx, deadline) //nolint:govet
				go v.UpdateDomainDataCaches(domainCtx, slot+1)
			}

			var wg sync.WaitGroup

			allRoles, err := v.RolesAt(slotCtx, slot)
			if err != nil {
				log.WithError(err).Error("Could not get validator roles")
				span.End()
				cancel()
				continue
			}
			// performRoles calls span.End()
			rolesCtx, _ := context.WithDeadline(ctx, deadline) //nolint:govet
			performRoles(rolesCtx, allRoles, v, slot, &wg, span)
		case e := <-v.EventsChan():
			v.ProcessEvent(ctx, e)
		case currentKeys := <-v.AccountsChangedChan(): // should be less of a priority than next slot
			onAccountsChanged(ctx, v, currentKeys)
		}
	}
}

func onAccountsChanged(ctx context.Context, v iface.Validator, current [][48]byte) {
	ctx, span := prysmTrace.StartSpan(ctx, "validator.accountsChanged")
	defer span.End()

	anyActive, err := v.HandleKeyReload(ctx, current)
	if err != nil {
		log.WithError(err).Error("Could not properly handle reloaded keys")
	}
	if !anyActive {
		log.Warn("No active keys found. Waiting for activation...")
		err := v.WaitForActivation(ctx)
		if err != nil {
			log.WithError(err).Warn("Could not wait for validator activation")
		} else {
			log.Debug("Resetting slot ticker after waiting for validator activation.")
			v.SetTicker()
		}
	}
}

func initialize(ctx context.Context, v iface.Validator) error {
	ctx, span := prysmTrace.StartSpan(ctx, "validator.initialize")
	defer span.End()

	ticker := time.NewTicker(backOffPeriod)
	defer ticker.Stop()

	firstTime := true

	for {
		if !firstTime {
			if ctx.Err() != nil {
				log.Info("Context canceled, stopping validator")
				return errors.New("context canceled")
			}
			<-ticker.C
		}

		firstTime = false

		if err := v.WaitForChainStart(ctx); err != nil {
			if isConnectionError(err) {
				log.WithError(err).Warn("Could not determine if beacon chain started")
				continue
			}

			return errors.Wrap(err, "could not determine if beacon chain started")
		}

		if err := v.WaitForKeymanagerInitialization(ctx); err != nil {
			return errors.Wrap(err, "Wallet is not ready")
		}

		if err := v.WaitForSync(ctx); err != nil {
			if isConnectionError(err) {
				log.WithError(err).Warn("Could not determine if beacon chain started")
				continue
			}

			return errors.Wrap(err, "could not determine if beacon node synced")
		}

		if err := v.WaitForActivation(ctx); err != nil {
			return errors.Wrap(err, "could not wait for validator activation")
		}

		if err := v.CheckDoppelGanger(ctx); err != nil {
			if isConnectionError(err) {
				log.WithError(err).Warn("Could not wait for checking doppelganger")
				continue
			}

			return errors.Wrap(err, "could not succeed with doppelganger check")
		}
		break
	}

	return nil
}

func performRoles(slotCtx context.Context, allRoles map[[48]byte][]iface.ValidatorRole, v iface.Validator, slot primitives.Slot, wg *sync.WaitGroup, span trace.Span) {
	for pubKey, roles := range allRoles {
		wg.Add(len(roles))
		for _, role := range roles {
			go func(role iface.ValidatorRole, pubKey [fieldparams.BLSPubkeyLength]byte) {
				defer wg.Done()
				switch role {
				case iface.RoleAttester:
					v.SubmitAttestation(slotCtx, slot, pubKey)
				case iface.RoleProposer:
					v.ProposeBlock(slotCtx, slot, pubKey)
				case iface.RoleAggregator:
					v.SubmitAggregateAndProof(slotCtx, slot, pubKey)
				case iface.RoleSyncCommittee:
					v.SubmitSyncCommitteeMessage(slotCtx, slot, pubKey)
				case iface.RoleSyncCommitteeAggregator:
					v.SubmitSignedContributionAndProof(slotCtx, slot, pubKey)
				case iface.RoleUnknown:
					log.WithField("pubkey", fmt.Sprintf("%#x", bytesutil.Trunc(pubKey[:]))).Trace("No active roles, doing nothing")
				default:
					log.Warnf("Unhandled role %v", role)
				}
			}(role, pubKey)
		}
	}

	// Wait for all processes to complete, then report span complete.
	go func() {
		wg.Wait()
		defer span.End()
		defer func() {
			if err := recover(); err != nil { // catch any panic in logging
				log.WithField("error", err).
					Error("Panic occurred when logging validator report. This" +
						" should never happen! Please file a report at github.com/prysmaticlabs/prysm/issues/new")
			}
		}()
		// Log performance in the previous slot
		v.LogSubmittedAtts(slot)
		v.LogSubmittedSyncCommitteeMessages()
		if err := v.LogValidatorGainsAndLosses(slotCtx, slot); err != nil {
			log.WithError(err).Error("Could not report validator's rewards/penalties")
		}
	}()
}

func isConnectionError(err error) bool {
	return err != nil && errors.Is(err, client.ErrConnectionIssue)
}

func handleAssignmentError(err error, slot primitives.Slot) {
	if errors.Is(err, ErrValidatorsAllExited) {
		log.Warn(ErrValidatorsAllExited)
	} else if errCode, ok := status.FromError(err); ok && errCode.Code() == codes.NotFound {
		log.WithField(
			"epoch", slot/params.BeaconConfig().SlotsPerEpoch,
		).Warn("Validator not yet assigned to epoch")
	} else {
		log.WithError(err).Error("Failed to update assignments")
	}
}
